General Web Development

Symfony’s new messenger component

Realizing asynchronous processes in Symfony

Denis Brumann

One of the big, new features of Symfony 4 is the messenger component, which was introduced in the last release. Similar to the EventDispatcher, it is sending messages but it can also send and receive messages across the boundaries of the application. Thus, it is offering the potential to introduce event sourcing and CQRS to Symfony applications.

The messenger component implements the message bus design pattern [1]. Essentially, it is about sending and receiving messages. In this manner, an asynchronous communication between applications can be enabled. A message is comparable to the already familiar events of Symfony. It encapsulates data that is transferred from a sender class to a receiver class. The message bus determines which message is transmitted to which receiver. The MessageBusInterface, by which messages are sent, has only one method called dispatch to which you can pass any PHP object as a message. By default, the serializer component converts the objects to a JSON string and passes it to a transport. The message is passed on via a transport in asynchronous processing. The messenger even takes messages out of the transport in order to process them. Two native transports are currently supported by the messenger component. Usually, messages in the same application, in the same PHP process are processed synchronously without any kind of detour. This is very helpful when it comes to debugging. For example, XDebug allows you to easily track how the message is being processed. The second transport uses the AMQP protocol via the according PHP extension, in order to send the message to a message queue and to receive messages from a message-queue. The routing inside the configuration of the bus is controlling which transport is to be used for which message. A CLI command can be executed to process messages of a message queue. Apart from the two supported transports, you can also use adapters to reuse existing transports, for example, from the popular Enqueue library [2]. That is also appropriate if the AMQP extension is not available and a PHP implementation is to be used instead.

A class must be created to process received messages that implements the magic method _invoke, whose only argument is the passed PHP object, i.e. the class of the originally sent message. Then, this class must be registered as a handler in the message bus. In a Symfony application, this happens via a service tag  messenger.messenger_handler. Alternatively, you can implement an empty HandlerInterface so that Symfony’s  dependency injection component can automatically add the tag via auto-configuration.

IPC NEWSLETTER

All news about PHP and web development

 

In order to use a messenger component in a Symfony 4 application, the application must first be installed with Composer. A Flex Recipe creates a basic configuration automatically and provides through it a message bus, which can be used without any further configuration. Commented parts of the configuration also show how the asynchronous AMPQ transport can be configured instead of the local, synchronic processing. The Recipe’s configuration can also be applied in an existing application without Flex. Apart from the bus that is provided by default, it is also possible to define custom buses. This is appropriate, for example, if the buses are supposed to behave differently. And this makes it also possible to define different types of buses, for example, a Command Bus that ensures that a message is handled by exactly one handler. But also an Event Bus that allows it that there is no handler registered for the message. By default, there must be at least one registered handler and messages are passed to all registered handlers.

Middleware

Middleware serve to control the behaviour of a message bus, at the receiving, sending and processing of messages. The messenger component serially registers three middleware on all defined buses.

  • LoggingMiddleware: protocols the processing of messages in the log
  • SendMessageMiddleware: sends messages to transport at asynchronous processing
  • HandleMessageMiddleware: calls a registered handler for the message

These can be deactivated via configuration through the Flag default_middleware. Furthermore, more middleware can be added. Currently, there are two additional middleware available. The ValidationMiddleware ensures that the message contains valid data.
The AllowNoHandlerMiddleware prevents the exception, if no valid handler was registered for the message.

If you write your own middleware, you have to implement the respective interface. Moreover, you should also note that the middleware is processed sequentially. To that extent, the order in that middleware is registered is relevant and only difficult to control. A priority can be assigned to the middleware in order to have some small degree of control. Nevertheless, you should still control the changes made by the middleware, so that they do not produce conflicts with other middleware. Currently, there is no standard definition of a middleware. Because of that, already existing middleware from other libraries, for example Tactician [3], cannot be directly reused. Fortunately, both interfaces are similar enough to each other so that an adapter can be written with little effort. One can also hope, that middleware, which is frequently needed, will be added to the component in later releases.

Distinction from the EventPatcher

At first glance, Symfony’s EventDispatcher and the message bus look very similar in usage. Both of them have a dispatch method to which an object is passed, which is then processed by another class. The similarities are obvious when comparing the EventSubscriberInterface
with the MessageSubscriberInterface. So, what makes the difference between the two components?

Apart from some fundamental differences – for example, that the EventDispatcher is expecting an instance of the event class as a message – the main difference is the restriction to the synchronous processing within the current process. Furthermore, there is also the option that you can stop the further processing of an event. An EventDispatcher, similar to the Symfony component, can be realized with the message bus, but not every message bus can be transmitted on to the EventDispatcher. If you intervene during the event cycle
of the Symfony framework or utilize the Doctrine lifecycle events, then you still are depending on the EventDispatcher.

If you have your own event system, which is not depended on breaking the event flow, you can still – with high probability – migrate this onto the message bus. You must decide for yourself, if this is appropriate. However, a careless switch to asynchronous processing can have serious consequences since, for example, this can lead to race conditions.

YOU LOVE PHP?

Explore the PHP Core Track

 

A case study for the use

We are considering a user registration as an example application. The registration is delegated via a command bus in order to be able to execute it later asynchronously. To do so, the necessary information is encapsulated within a signup message and passed to a message queue. If a registration is currently not possible, for example, because the database has failed, the requests are collected and processed as soon as the error has been corrected. After a successful registration, a SignupCompleted message is transmitted to an event bus. The event bus is responsible for triggering secondary processes, such as sending a confirmation mail or transferring the data to connected systems – a CRM would be conceivable. You can find the complete example code in a GitHub repository [4].

At first we need a PHP class that can be used as a message. Since it is a DTO, we only need public attributes like name, email address and password (Listing 1). As long as we use the messenger component without a transport, we have complete control of the data. However,
if you are using AMQP, it is you who is responsible for providing an adequate protection for sensitive information, such as passwords or data protection relevant content (e.g. names and email addresses), for both the communication path and also in the queue. Middleware, your own transmitters or receivers that are encrypting and decrypting data are conceivable options.


namespace App\Message;
 
use Symfony\Component\Validator\Constraints as Assert;
 
final class SignupCommand
{
  /**
    * @Assert\NotBlank()
    */
  private $firstName;
  /**
    * @Assert\NotBlank()
    */
  private $lastName;
  /**
    * @Assert\NotBlank()
    * @Assert\Email()
    */
  private $email;
  /**
    * @Assert\NotBlank()
    * @Assert\Length(min=6)
    */
  private $password;
 
  // ... getters & setters
}

In order to send our message, we instance our message with the data of a form and
then we dispatch it via our configured command message bus (Listing 2).


public function signup(Request $request, MessageBusInterface $commandBus)
{
  $form = $this->createForm(SignupFormType::class);
  $form->handleRequest($request);
  if ($form->isSubmitted() && $form->isValid()) {
    $message = new \App\Message\SignupCommand(
      $form->get('firstName')->getData(),
      $form->get('lastName')->getData(),
      $form->get('emailAddress')->getData(),
      $form->get('password')->getData()
    );
    $commandBus->dispatch($message);
  }
  return $this->render('signup.html.twig', ['form' => $form->createView()]);
}

Next, we need a message handler that receives and processes the message.
There are currently three ways to create a handler. Probably the easiest way is to write
a class that implements the MessageHandlerInterface and provides a method __invoke(Signup $message)(Listing 3). Alternatively, you can do without the interface and declare
a service tag in the app/config/services.yaml. The third possibility is to implement the MessageSubscriberInterface, which works similar to the EventSubscriber.


namespace App\MessageHandler;
 
use App\Entity\User;
use App\Message\SignupCommand;
use App\Message\SignupCompletedEvent;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Security\Core\Encoder\UserPasswordEncoderInterface;
 
final class SignupHandler
{
  private $entityManager;
  private $passwordEncoder;
  private $eventBus;
  public function __construct(
    EntityManagerInterface $entityManager,
    UserPasswordEncoderInterface $passwordEncoder,
    MessageBusInterface $eventBus
  ) {
    $this->entityManager = $entityManager;
    $this->passwordEncoder = $passwordEncoder;
    $this->eventBus = $eventBus;
  }
 
  public function __invoke(SignupCommand $command): void
  {
    $user = new User($command->getEmail());
    $encodedPassword = $this->passwordEncoder->encodePassword($user, $command->getPassword());
    $user->updatePassword($encodedPassword);
 
    $this->entityManager->persist($user);
    $this->entityManager->flush();
 
    $event = SignupCompletedEvent::fromSignupCommand($command);
    $this->eventBus->dispatch($event);
  }
}

Apart from the fact that the class must be invokable, there are no other requirements for a message handler. The handler is a separate service. Dependencies, like Doctrine’s EntityManager, can be added via constructor injection. This includes Message Buses. In other words, we can send messages that are sending messages, and so on. This allows us to inform our event handlers in the signup command handler that the registration is complete. The Message Bus prevents endless loops, which occur when a message that has already been sent is to be resent. If several buses are used, then it is advisable to consider a naming convention for messages, in order to easily distinguish which bus is to be used to dispatch the message. In our case, we use both a suffix that marks the type and also the past tense to distinguish events from commands by name. In each case, the routing configuration determines which transport is used, and the type Hint at the handler determines which message the handler is responsible for. Therefore, the used bus only influences the middleware that have been cycled through.

IPC NEWSLETTER

All news about PHP and web development

 

Just like mentioned in the beginning, there are different conceptual requirements for our message buses. We create the appropriate buses and assign them middleware. However, if you are using multiple buses, you must support Symfony’s auto-wiring so that it injects the correct instance.

In order to so, it is recommended that you couple an instance to an argument via bind [5]. In other words: Whenever a MessageBusInterface that uses the name parameter $commandBus is to be injected, the message bus configured for this purpose is passed.
If the parameter is named $eventBus, the instance configured for this purpose, is injected.

It is appropriate for the Event Bus to activate NoHandlerAllowedMiddleware because – via convention – it is valid to not assign a handler to an event. If we haven’t defined a handler, we should not get an exception, unlike the Command Bus. If we react to the event, we should see it in the logs that firstly the processing of the signup message begins, then subsequently the SignupCompleted message is processed and completed, and finally the processing of the signup message is also completed.

Asynchronous processing via message queue   

For the asynchronous processing of messages we require a message broker that supports the AMQP protocol. A common solution is RabbitMQ. After the installation, you can store the DSN string, which is required for the communication with the message broker, in its configuration. This looks similar to the DATABASE_URL for Doctrine ORM. The DSN should look something like this, if RabbitMQ was locally installed: amqp://guest:guest@localhost:5672/%2f/messages.

If the transport was created in the configuration, it is not yet used automatically. Instead, a transport must be passed for each message type. If you want to use the same transport for all the messages, you can use the *-Wildcard. In order to continue to use the synchronous processing for single messages without transport, you can pass zero as a value. If you want to send a message via several transports, you can pass a list of IDs instead of a single transport. In the context of our approach, you could also, for example, establish a query bus, which represents the complementary to a command. Queries are usually processed synchronously, which means that they aren’t sent to a message broker. In order to simplify the routing, you can establish a collection interface for the message types and then assign them a transport. Listing 4 shows an example of such a configuration.


framework:
  messenger:
    transports:
      commands: '%env(MESSENGER_TRANSPORT_DSN)%/commands'
      events: '%env(MESSENGER_TRANSPORT_DSN)%/events'
    routing:
      App\Message\SignupCommand: ['commands']
      App\Message\SignupCompletedEvent: ['events']
    default_bus: app.messenger.command_bus
    buses:
      app.messenger.command_bus:
        middleware:
          - messenger.middleware.validation
      app.messenger.event_bus:
        middleware:
          - messenger.middleware.validation
          - messenger.middleware.allow_no_handler

The configured handlers are not called automatically when a transport is used. If you want to process the message, then you must retrieve these out of the queue via a Receiver. In most cases, you use a CLI application to do this. The messenger component registers a custom command, messenger:consume-messages, which you can execute via bin/console. If several transports have been defined, then you can pass the favoured receiver transport as an argument. If you don’t do this, then you have to choose one from a list. Through a variety of options you can also control the consumer’s lifetime. The consumer can be stopped automatically either after a processing a certain number of messages, a certain time, or when reaching memory limit. A new process can then be started automatically using a process manager such as systemd. If you activate verbose output, you can see which messages are being processed. Next to asynchronous message processing, you can also scale message processing by starting other processes.

One advantage of asynchronous message processing is that an error does not cause our application to terminate. But the disadvantage is that the worker process terminates and the message processing is stopped. The defective message remains in the queue and is processed again when a worker is started. This leads to an error again and thus again to the termination. In order to avoid such a loop, you can configure so-called Dead Letter Exchanges in RabbitMQ. With a bit of code, a similar mechanism can be implemented within your own application. The starting point is a Middleware with a generic try-catch-block. In the case of an exception, the current message is encapsulated in a RetryMessage class and then dispatched by Message Bus. This way all defective messages are collected, can be analysed and then pushed back into the original queue via a RetryMessageHandler or converted back into the original message. The example repository [6] contains an exemplary, naive implementation of this principle.

 

Conclusion

Symfony’s messenger component makes it easier than ever to introduce asynchronous processing into an existing application. Limiting factors are the limitation to the AMQP extension as transport and a few middleware. Additional transports can be added via the adapter provided by Enqueue or custom senders, receivers and transports can be written. However, it is to be hoped that standardized solutions will establish themselves in the community. For me, the seamless switch from synchronous to asynchronous processing by adjusting the configuration is particularly interesting, since it enables you to analyse the processing locally and thus recognize elementary problems in asynchronous processing at an early stage. Since debugging over several processes can be very exhausting, this makes it easier to start developing distributed applications.  With the introduction of Flex, Symfony 4 has focused on the use of PHP and Symfony in service-based infrastructures, and the messenger component is a welcome addition to the ecosystem to make event-driven services easier to implement.

 

 

Links & Literature

[1] Enterprise Integration Patterns, „Message Bus“: https://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html

[2] Enqueue: https://enqueue.forma-pro.com/

[3] Tactician, „Introduction“: tactician.thephpleague.com/

[4] GitHub, „Example Application for the Symfony messenger component“: https://github.com/sensiolabs-de/messenger-signup-demo

[5] Symfony, „New in Symfony 3.4: Local service binding“: https://symfony.com/blog/new-in-symfony-3-4-local-service-binding

[6] GitHub: https://github.com/sensiolabs-de/messenger-signup-demo/commit/6a76da25f98f9187f5d09f02fc3405b785f2aa89

Top Articles About General Web Development

Stay tuned!

Register for our newsletter

Behind the Tracks of IPC

PHP Core
Best practices & applications

General Web Development
Broader web development topics

Test & Performance
Software testing and performance improvements

Agile & People
Getting agile right is so important

Software Architecture
All about PHP frameworks, concepts &
environments

DevOps & Deployment
Learn about DevOps and transform your development pipeline

Content Management Systems
Sessions on content management systems

#slideless (pure coding)
See how technology really works

Web Security
All about
web security

PUSH YOUR CODE FURTHER